SQS FIFO キューで処理に失敗したときの実行順序を調べてみた

SQS FIFO キューで処理に失敗したときの実行順序を調べてみた

SQS FIFO キューで処理に失敗した場合に実行順序がどうなるのかを調べてみました
Clock Icon2020.01.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

西田@大阪です

今回はSQS FIFO キューで処理に失敗した場合に実行順序がどうなるのかを調べてみました

SQS の FIFO とは

SQS の FIFOキューについては以下の記事を参照ください

【新機能】Amazon SQSにFIFOが追加されました!(重複削除/単一実行/順序取得に対応)

標準キューと違って以下の点が特徴です

  • キューの重複がおこりません
  • キューの順序が保証されます

疑問

SQS FIFO のドキュメントから抜粋です

複数回の再試行

■ コンシューマーが失敗した SendMessage アクションを検出した場合、同じメッセージ重複排除 ID を使用して必要に応じて何回でも送信を再試行できます。重複排除の期間が終了する前にプロデューサーが少なくとも 1 つの確認を受信した場合、複数の再試行を行ってもメッセージの順序に影響したり、重複が発生したりすることはありません。

■ コンシューマーが失敗した ReceiveMessage アクションを検出した場合、同じ受信リクエスト試行 ID を使用して必要に応じて何回でも再試行できます。可視性タイムアウトが終了する前にコンシューマーが少なくとも 1 つの確認を受信した場合、複数の再試行を行ってもメッセージの順序に影響はありません。

■ メッセージグループ ID を含むメッセージを受信した場合、メッセージを削除するか、メッセージが表示されるまで、同じメッセージグループ ID のメッセージはそれ以上返されません。

ドキュメントからは以下のことは保証されていそうです

  • 再試行されてもューの順序は守られる
  • 再試行されてもキューの重複はおこらない

ただ、以下のケースについてはうまく読み取れませんでした

  • 複数キューが失敗した場合は再試行される順序は保証されるのか?
  • 複数サブスクライバーがいた場合、失敗したキューが Dead Letter Queueにいくまですべてのコンシューマーはブロックされるのか?

先に答えから

先に答えから書きます

  • 失敗しても順番は 保証されます
  • 失敗したキューが Dead Letter Queue に行くまで すべてのサブスクライバーはブロックされます

VisibilityTimeoutを超えてキューが処理が続行してしまう場合は順番は保証されず、複数回実行される点をご注意ください

試してみた

以下の設定で Dead Letter Queue に行くSQS FIFOキューを作成します

  • VisibilityTimeout が 3秒
  • 2回失敗
AWSTemplateFormatVersion: "2010-09-09"
Description: "SQS FIFO Template"
Resources:
  CreateSetuppackDeadLetterQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: sqs-sample-dl.fifo
      FifoQueue: true
  CreateSetuppackQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: sqs-sample.fifo
      FifoQueue: true
      ContentBasedDeduplication: false
      RedrivePolicy:
        deadLetterTargetArn:
          Fn::GetAtt:
            - CreateSetuppackDeadLetterQueue
            - Arn
        maxReceiveCount: 2
      VisibilityTimeout: 3

パブリッシャー側のコードです

def enqueue(n=1):
    for i in range(n):
        sqs.send_message(
            QueueUrl=SQS_URL,
            MessageBody=f"{i}",
            MessageGroupId="test",
            MessageDeduplicationId=str(uuid.uuid4())
        )

渡された引数の数値の数だけ "1", "2" ... と数字(str)をエンキューしていきます

サブスクライバー側のコードです。

def dequeue():
    print(threading.get_ident())

    while True:
        response = sqs.receive_message(
            QueueUrl=SQS_URL,
            MaxNumberOfMessages=1
        )

        if 'Messages' not in response:
            time.sleep(0.1)
            continue

        for message in response['Messages']:
            if message['Body'] == '15':
                print(f"{threading.get_ident()} {datetime.datetime.now()} {message['Body']} failed")
                time.sleep(5)
            else:
                print(f"{threading.get_ident()} {datetime.datetime.now()} {message['Body']}")

                handle = message['ReceiptHandle']

                try:
                    sqs.delete_message(
                        QueueUrl=SQS_URL,
                        ReceiptHandle=handle
                    )
                except ClientError as e:
                    print(e)


enqueue(20)

with ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(dequeue)
    executor.submit(dequeue)
    executor.submit(dequeue)

メッセージを受信しメッセージの内容が'15'であれば VisibilityTimeout より長めにスリープしています

実行結果です

スレッドID, 実行時間, 処理したメッセージの内容 の順に表示されています

123145397649408
123145414438912
123145431228416
123145397649408 2020-01-31 08:20:25.727753 0
123145397649408 2020-01-31 08:20:25.794476 1
123145397649408 2020-01-31 08:20:25.865409 2
123145397649408 2020-01-31 08:20:25.930104 3
123145431228416 2020-01-31 08:20:25.993035 4
123145431228416 2020-01-31 08:20:26.057589 5
123145431228416 2020-01-31 08:20:26.133130 6
123145397649408 2020-01-31 08:20:26.173065 7
123145397649408 2020-01-31 08:20:26.237981 8
123145397649408 2020-01-31 08:20:26.303023 9
123145414438912 2020-01-31 08:20:26.342946 10
123145414438912 2020-01-31 08:20:26.397174 11
123145414438912 2020-01-31 08:20:26.462444 12
123145414438912 2020-01-31 08:20:26.527355 13
123145414438912 2020-01-31 08:20:26.588985 14
123145414438912 2020-01-31 08:20:26.653587 15 failed
123145397649408 2020-01-31 08:20:29.719290 15 failed
123145414438912 2020-01-31 08:20:32.746732 16
123145414438912 2020-01-31 08:20:32.801750 17
123145414438912 2020-01-31 08:20:32.867375 18
123145414438912 2020-01-31 08:20:32.934917 19

0から順に処理され 15 で2回失敗する間すべてのサブスクライバーが止まってることが確認できると思います

最後に

この記事が誰かの参考になれば幸いです

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.